home *** CD-ROM | disk | FTP | other *** search
- # Source Generated with Decompyle++
- # File: in.pyo (Python 2.5)
-
- import traceback
- from common import profile
- from util import unpack_pstr, pack_pstr
- from common.emailaccount import EmailAccount
- from AccountManager import NETWORK_FLAG
- from prefs import localprefprop
- SMTP_UPGRADING = True
- import smtplib
- import re
- MAXHEADERLEN = 76
-
- class SMTPsender:
- _ignore_domains = []
- addrfmt = '[\\w\\d_\\.\\-\\+=]+\\@(?:(?:[\\w\\d\\-])+\\.)+(?:[\\w\\d]{2,4})'
- shortaddr_re = re.compile('%s$' % addrfmt)
- longaddr_re = re.compile('^\\s*(.*)\\s+<(%s)>\\s*$' % addrfmt)
-
- def __init__(self, name, password, server, port = 25, use_tls = False, from_name = None, reply_to = None):
- self.user_name = name
- self.password = password
- self.smtp_server = server
- self.smtp_port = port
- self.from_name = None if from_name is not None else name
- self.replyto_email = None if reply_to is not None else name
- self._use_tls = use_tls
- self._init_pref_encoding()
-
-
- def format_header(self, key, name, email = None):
- Header = Header
- import email.Header
- maxlength = MAXHEADERLEN - (len(key) + 2)
- if maxlength < 10:
- raise AssertionError, 'Header length is too short'
-
-
- try:
- tmp = None if isinstance(name, unicode) else name
- header = Header(tmp, 'utf-8', maxlinelen = maxlength)
- except UnicodeEncodeError:
- header = Header(name, self._charset, maxlinelen = maxlength)
-
- if not email:
- return header
- else:
- return '"%s" <%s>' % (header, email)
-
-
- def add_headers(self, msg, headers):
- for h in headers:
- msg[h] = self.encode_header(h, headers[h])
-
-
-
- def get_smtp_address(self, address):
- if not address:
- return None
-
-
- def is_email(address):
- pos = address.find('@')
- if pos == -1:
- return False
-
- if address[pos + 1:].lower() in self._ignore_domains:
- return False
-
- return True
-
- if not is_email(address):
- if address == 'anonymous':
- return None
-
- if self.email_map.has_key(address):
- address = self.email_map[address]
- elif SMTPsender.nodomaddr_re.match(address):
- if self.config.getbool('notification', 'use_short_addr'):
- return address
-
- domain = self.config.get('notification', 'smtp_default_domain')
- if domain:
- address = '%s@%s' % (address, domain)
- else:
- self.env.log.info('Email address w/o domain: %s' % address)
- return None
-
-
- mo = self.shortaddr_re.search(address)
- if mo:
- return mo.group(0)
-
- mo = self.longaddr_re.search(address)
- if mo:
- return mo.group(2)
-
- self.env.log.info('Invalid email address: %s' % address)
-
-
- def _init_pref_encoding(self):
- Charset = Charset
- QP = QP
- BASE64 = BASE64
- import email.Charset
- self._charset = Charset()
- self._charset.input_charset = 'utf-8'
- pref = 'base64'
- if pref == 'base64':
- self._charset.header_encoding = BASE64
- self._charset.body_encoding = BASE64
- self._charset.output_charset = 'utf-8'
- self._charset.input_codec = 'utf-8'
- self._charset.output_codec = 'utf-8'
- elif pref in ('qp', 'quoted-printable'):
- self._charset.header_encoding = QP
- self._charset.body_encoding = QP
- self._charset.output_charset = 'utf-8'
- self._charset.input_codec = 'utf-8'
- self._charset.output_codec = 'utf-8'
- elif pref == 'none':
- self._charset.header_encoding = None
- self._charset.body_encoding = None
- self._charset.input_codec = None
- self._charset.output_charset = 'ascii'
- else:
- raise AssertionError, 'Invalid email encoding setting: %s' % pref
-
-
- def encode_header(self, key, value):
- if isinstance(value, tuple):
- return self.format_header(key, value[0], value[1])
-
- if isinstance(value, list):
- items = []
- for v in value:
- items.append(self.encode_header(v))
-
- return ',\n\t'.join(items)
-
- mo = self.longaddr_re.match(value)
- if mo:
- return self.format_header(key, mo.group(1), mo.group(2))
-
- return self.format_header(key, value)
-
-
- def begin_send(self):
- self.server = smtplib.SMTP(self.smtp_server, self.smtp_port)
- if self._use_tls:
- self.server.ehlo()
- if not self.server.esmtp_features.has_key('starttls'):
- raise AssertionError, 'TLS enabled but server does not support TLS'
-
- self.server.starttls()
- self.server.ehlo()
-
- if self.user_name:
-
- try:
- self.server.login(self.user_name, self.password)
-
-
-
-
- def send_email(self, to = '', subject = '', body = '', cc = '', bcc = ''):
- self.begin_send()
- self.send([
- to], [], subject, body)
-
- try:
- self.finish_send()
- except:
- pass
-
-
-
- def send(self, torcpts, ccrcpts, subject, body, mime_headers = { }):
- MIMEText = MIMEText
- import email.MIMEText
- headers = { }
- headers['Subject'] = subject
- headers['From'] = (self.from_name, self.from_name)
-
- def build_addresses(rcpts):
- return []([], [ self.get_smtp_address(addr) for addr in rcpts ])
-
-
- def remove_dup(rcpts, all):
- tmp = []
- for rcpt in rcpts:
- if rcpt not in all:
- tmp.append(rcpt)
- all.append(rcpt)
- continue
-
- return (tmp, all)
-
- toaddrs = build_addresses(torcpts)
- recipients = []
- (toaddrs, recipients) = remove_dup(toaddrs, recipients)
- if len(recipients) < 1:
- return None
-
- headers['To'] = ', '.join(toaddrs)
- msg = MIMEText(body.encode('utf-8'), 'plain')
- del msg['Content-Transfer-Encoding']
- msg.set_charset(self._charset)
- self.add_headers(msg, headers)
- self.add_headers(msg, mime_headers)
- msgtext = msg.as_string()
- recrlf = re.compile('\r?\n')
- msgtext = '\r\n'.join(recrlf.split(msgtext))
- self.server.sendmail(msg['From'], recipients, msgtext)
-
-
- def finish_send(self):
- if self._use_tls:
- import socket
-
- try:
- self.server.quit()
- except socket.sslerror:
- pass
- except:
- None<EXCEPTION MATCH>socket.sslerror
-
-
- None<EXCEPTION MATCH>socket.sslerror
- self.server.quit()
-
-
- from common.emailaccount import EmailAccount
- from util import threaded
-
- class SMTPEmailAccount(EmailAccount):
- DEFAULT_SMTP_REQUIRE_SSL = False
- DEFAULT_SMTP_PORT = 25
-
- def __init__(self, **options):
- d = self.default
- self.smtp_server = options.pop('smtp_server', d('smtp_server'))
- self.smtp_require_ssl = options.pop('smtp_require_ssl', d('smtp_require_ssl'))
- self.smtp_port = options.pop('smtp_port', d('smtp_port'))
- self.smtp_username = options.pop('smtp_username', d('smtp_username'))
- self._email_address = options.pop('email_address', d('email_address'))
- self._encrypted_smtppw = profile.crypt_pw(options.pop('smtp_password', d('smtp_password')))
- self._encrypted_pw = options.pop('password')
- EmailAccount.__init__(self, password = self.password, **options)
-
-
- def get_email_address(self):
- return self._email_address
-
-
- def set_email_address(self, val):
- self._email_address = val
-
-
- def from_name(self):
- return self.email_address
-
- from_name = property(from_name)
-
- def _unglue_pw(cls, password):
- passwordstr = profile.plain_pw(password).encode('utf-8')
- (password, r) = unpack_pstr(passwordstr)
-
- try:
- (smtppassword, r) = unpack_pstr(r)
- except:
- (smtppassword, r) = (u'', u'')
-
- return (profile.crypt_pw(password.decode('utf-8')), profile.crypt_pw(smtppassword.decode('utf-8')))
-
- _unglue_pw = classmethod(_unglue_pw)
-
- def _set_password(self, password):
-
- try:
- (self._encrypted_pw, self._encrypted_smtppw) = self._unglue_pw(password)
- except Exception:
- e = None
- traceback.print_exc()
- self._encrypted_pw = password
- self._encrypted_smtppw = ''
-
-
-
- def _glue_pw(cls, encrypted_pw, encrypted_smtppw):
- password = pack_pstr(profile.plain_pw(encrypted_pw).encode('utf-8')).decode('utf-8')
- smtppw = pack_pstr(profile.plain_pw(encrypted_smtppw).encode('utf-8')).decode('utf-8')
- return profile.crypt_pw(password + smtppw)
-
- _glue_pw = classmethod(_glue_pw)
-
- def _get_password(self):
- return self._glue_pw(self._encrypted_pw, self._encrypted_smtppw)
-
- password = property(_get_password, _set_password)
-
- def _decryptedpw(self):
- return profile.plain_pw(self._encrypted_pw)
-
-
- def _decrypted_smtppw(self):
- return profile.plain_pw(self._encrypted_smtppw)
-
- smtp_password = property(_decrypted_smtppw)
-
- def update_info(self, **info):
- if not self.isflagged(NETWORK_FLAG):
- if 'password' in info:
- self._encrypted_pw = info.pop('password')
-
- if 'smtp_password' in info:
- self._encrypted_smtppw = profile.crypt_pw(info.pop('smtp_password'))
-
- else:
- self.password = info.pop('password')
- EmailAccount.update_info(self, **info)
-
-
- def _get_options(self):
- opts = EmailAccount._get_options(self)
- opts.update((dict,)((lambda .0: for a in .0:
- (a, getattr(self, a)))('smtp_server smtp_port smtp_require_ssl smtp_username email_address'.split())))
- return opts
-
-
- def from_net(cls, info):
- password = info.password
-
- try:
- (encrypted_pw, encrypted_smtppw) = cls._unglue_pw(password)
- except Exception:
- e = None
- traceback.print_exc()
- encrypted_pw = password
- encrypted_smtppw = u''
-
- info.password = encrypted_pw
- smtppw = profile.plain_pw(encrypted_smtppw)
- return EmailAccount.from_net(info, smtp_password = smtppw)
-
- from_net = classmethod(from_net)
-
- def send_email(self, to = '', subject = '', body = '', cc = '', bcc = ''):
- if not self.smtp_username:
- pass
- un = self.username
- f = self.from_name
- if not self._decrypted_smtppw():
- pass
- password = None if not self.smtp_username else ''
- srv = self.smtp_server
- if srv in ('smtp.aol.com', 'smtp.aim.com'):
- if un.endswith('aol.com') or un.endswith('aim.com'):
- f = un
- else:
- f = un + u'@aol.com'
-
- s = SMTPsender(un, password, srv, from_name = f, use_tls = self.smtp_require_ssl)
- s.send_email(to = to, subject = subject, body = body, cc = cc, bcc = bcc)
-
- send_email = threaded(send_email)
- mailclient = localprefprop(EmailAccount.mailclient_localprefs_key, 'sysdefault')
-
-